大数据Spark生态系统 修仙之道 Spark Blog

2019-05-15 Docs Language:简体中文 & English Programing Spark Website:www.geekparkhub.com OpenSource GitHub repo size in bytes GeekDeveloper:JEEP-711 Github:github.com/geekparkhub Gitee:gitee.com/geekparkhub

🐘 Spark Technology 修仙之道 金仙道果 🐘

Alt text


🔥 1. Spark 基础 🔥

1.1 Spark 概述

1.1.1 Spark 模块

enter image description here

1.1.2 Spark 特点

1.1.3 Spark 应用场景

1.2 Spark 部署

解压spark-2.1.1-bin-hadoop2.7.tgz

[root@systemhub511 software]# tar -zxvf spark-2.1.1-bin-hadoop2.7.tgz -C /opt/module/

重命名spark-2.1.1-bin-hadoop2.7

[root@systemhub511 module]# mv spark-2.1.1-bin-hadoop2.7/ spark

1.3 Spark 运行模式

💥 1.3.1 Loacl Mode 💥

1.3.1.1 Loacl Mode 概述
1.3.1.2 (求π) & (WordCount) & (本地调试) 官方案例
bin/spark-submit \
--class <main-class>
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--executor-memory 1G \
--total-executor-cores 1 \
./examples/jars/spark-examples_2.11-2.1.1.jar \
100
[root@systemhub511 spark]# bin/spark-submit \
> --class org.apache.spark.examples.SparkPi \
> --executor-memory 1G \
> --total-executor-cores 1 \
> ./examples/jars/spark-examples_2.11-2.1.1.jar \
> 100
INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 3.059446 s
Pi is roughly 3.1411463141146316
[root@systemhub511 spark]# bin/spark-shell
Spark context Web UI available at http://systemhub511:4040
Spark context available as 'sc' (master = local[*], app id = local-1558677071165).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.1
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_162)
Type in expressions to have them evaluated.
Type :help for more information.
scala>

enter image description here

[root@systemhub511 spark]# mkdir -p input/wordcount
[root@systemhub511 spark]# cd input/wordcount/
[root@systemhub511 wordcount]# vim wordcount_001.txt
hadoop spark hive
hadoop spark hadoop
hbase flume hive
scala java oozie
scala> sc.textFile("/opt/module/spark/input/wordcount/wordcount_001.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
res0: Array[(String, Int)] = Array((scala,1), (spark,2), (hive,2), (hadoop,3), (oozie,1), (flume,1), (java,1), (hbase,1))
scala>
scala> sc.textFile("/opt/module/spark/input/wordcount/wordcount_001.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("./output/wordcount/")
[root@systemhub511 spark]# cd output/wordcount/
[root@systemhub511 wordcount]# ll
total 4
-rw-r--r--. 1 root root 79 May 24 14:48 part-00000
-rw-r--r--. 1 root root 0 May 24 14:48 _SUCCESS
[root@systemhub511 wordcount]# cat part-00000
(scala,1)
(spark,2)
(hive,2)
(hadoop,3)
(oozie,1)
(flume,1)
(java,1)
(hbase,1)
[root@systemhub511 wordcount]#
1.3.1.3 提交流程

enter image description here

1.3.1.4 数据流程
参数列表 参数描述
textFile("input") 读取本地文件input文件夹数据
flatMap(_.split(" ")) 压平操作,按照空格分割符将一行数据映射成一个个单词
map((_,1)) 对每一个元素操作,将单词映射为元组
reduceByKey(_+_) 按照key将值进行聚合相加
collect 将数据收集到Driver端展示

enter image description here

💥 1.3.2 Standalone Mode 💥

1.3.2.1 Standalone Mode 概述

enter image description here

1.3.2.2 StandaloneMode QuickStart
[root@systemhub511 spark]# cd conf/
[root@systemhub511 conf]# mv slaves.template slaves
[root@systemhub511 conf]# mv spark-env.sh.template spark-env.sh
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# A Spark Worker will be started on each of the machines listed below.
systemhub511
systemhub611
systemhub711
# Options for the daemons used in the standalone deploy mode
SPARK_MASTER_HOST=systemhub511
SPARK_MASTER_PORT=7077
[root@systemhub511 module]# scp -r spark/ root@systemhub611:/opt/module/
[root@systemhub511 module]# scp -r spark/ root@systemhub711:/opt/module/
[root@systemhub511 spark]# sbin/start-all.sh
starting org.apache.spark.deploy.master.Master, logging to /opt/module/spark/logs/spark-root-org.apache.spark.deploy.master.Master-1-systemhub511.out
systemhub711: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-systemhub711.out
systemhub611: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-systemhub611.out
systemhub511: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-systemhub511.out
[root@systemhub511 spark]#
[root@systemhub511 spark]# jps.sh
================ root@systemhub511 All Processes ===========
30651 org.apache.spark.deploy.worker.Worker
30443 org.apache.spark.deploy.master.Master
813 sun.tools.jps.Jps
================ root@systemhub611 All Processes ===========
10369 org.apache.spark.deploy.worker.Worker
11777 sun.tools.jps.Jps
================ root@systemhub711 All Processes ===========
8960 org.apache.spark.deploy.worker.Worker
10364 sun.tools.jps.Jps
[root@systemhub511 spark]#
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://systemhub511:7077 \
--executor-memory 1G \
--total-executor-cores 1 \
./examples/jars/spark-examples_2.11-2.1.1.jar \
100
[root@systemhub511 spark]# bin/spark-submit \
> --class org.apache.spark.examples.SparkPi \
> --master spark://systemhub511:7077 \
> --executor-memory 1G \
> --total-executor-cores 1 \
> ./examples/jars/spark-examples_2.11-2.1.1.jar \
> 100
INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 6.478381 s
Pi is roughly 3.1405883140588315
[root@systemhub511 spark]# bin/spark-shell --master spark://systemhub511:7077
Spark context Web UI available at http://systemhub511:4040
Spark context available as 'sc' (master = spark://systemhub511:7077, app id = app-20190524174512-0001).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.1
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_162)
Type in expressions to have them evaluated.
Type :help for more information.
scala> sc.textFile("/opt/module/spark/input/wordcount/wordcount_001.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
res0: Array[(String, Int)] = Array((scala,1), (hive,2), (oozie,1), (java,1), (spark,2), (hadoop,3), (flume,1), (hbase,1))
scala>

enter image description here

[root@systemhub511 conf]# mv spark-defaults.conf.template spark-defaults.conf
spark.master spark://systemhub511:7077
spark.eventLog.enabled true
spark.eventLog.dir hdfs://systemhub511:9000/directory
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.retainedApplications=30 -Dspark.history.fs.logDirectory=hdfs://systemhub511:9000/directory"
spark.eventLog.dir:Application在运行过程中所有信息均记录在该属性指定的路径下.
spark.history.ui.port=18080 WEBUI访问端口号为18080
spark.history.fs.logDirectory=hdfs://systemhub511:9000/directory 配置了该属性后,在start-history-server.sh时就无需再显示指定路径,Spark History Server只展示该指定路径下信息.
spark.history.retainedApplications=30 指定保存Application历史记录个数,如果超过这个值,旧应用程序信息将被删除,这个是内存中应用数,而不是页面上显示应用数.
[root@systemhub511 module]# scp -r spark/ root@systemhub611:/opt/module/
[root@systemhub511 module]# scp -r spark/ root@systemhub711:/opt/module/
[root@systemhub511 hadoop]# sbin/start-dfs.sh
[root@systemhub511 spark]# hadoop fs -mkdir /directory
[root@systemhub511 spark]# sbin/start-all.sh
[root@systemhub511 spark]# sbin/start-history-server.sh
[root@systemhub511 spark]# bin/spark-shell --master spark://systemhub511:7077
sc.textFile("/opt/module/spark/input/wordcount/wordcount_001.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect

enter image description here

1.3.2.3 Spark HA 高可用

enter image description here

# SPARK_MASTER_HOST=systemhub511
# SPARK_MASTER_PORT=7077
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=systemhub511,systemhub611,systemhub711 -Dspark.deploy.zookeeper.dir=/spark"
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.retainedApplications=30 -Dspark.history.fs.logDirectory=hdfs://systemhub511:9000/directory"
[root@systemhub511 module]# scp -r spark/ root@systemhub611:/opt/module/
[root@systemhub511 module]# scp -r spark/ root@systemhub711:/opt/module/
[root@systemhub511 spark]# /opt/module/hadoop/sbin/start-dfs.sh
[root@systemhub511 spark]# /opt/module/zookeeper/bin/zkServer.sh start
[root@systemhub611 ~]# /opt/module/zookeeper/bin/zkServer.sh start
[root@systemhub711 ~]# /opt/module/zookeeper/bin/zkServer.sh start
[root@systemhub511 spark]# sbin/start-all.sh
[root@systemhub611 ~]# /opt/module/spark/sbin/start-master.sh
[root@systemhub511 spark]# bin/spark-shell --master spark://systemhub511:7077,systemhub611:7077

http://systemhub511:8080 | systemhub511节点状态为ALIVE
http://systemhub611:8080 | systemhub611节点状态为STANDBY

enter image description here

[root@systemhub511 spark]# jps.sh
================ root@systemhub511 All Processes ===========
32242 org.apache.hadoop.hdfs.server.namenode.NameNode
11206 org.apache.spark.deploy.master.Master
11368 org.apache.spark.deploy.worker.Worker
9705 org.apache.zookeeper.server.quorum.QuorumPeerMain
32444 org.apache.hadoop.hdfs.server.datanode.DataNode
5228 sun.tools.jps.Jps
================ root@systemhub611 All Processes ===========
9157 org.apache.spark.deploy.master.Master
8901 org.apache.spark.deploy.worker.Worker
2822 sun.tools.jps.Jps
30214 org.apache.hadoop.hdfs.server.datanode.DataNode
7495 org.apache.zookeeper.server.quorum.QuorumPeerMain
================ root@systemhub711 All Processes ===========
5312 org.apache.spark.deploy.worker.Worker
31568 sun.tools.jps.Jps
26869 org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode
26647 org.apache.hadoop.hdfs.server.datanode.DataNode
4014 org.apache.zookeeper.server.quorum.QuorumPeerMain
[root@systemhub511 spark]#
[root@systemhub511 spark]# kill -9 11206

enter image description here

💥 1.3.3 Yarn Mode 💥

1.3.3.1 Yarn Mode 概述

enter image description here

1.3.3.2 YarnMode QuickStart
YARN_CONF_DIR=/opt/module/hadoop/etc/hadoop
spark.master spark://systemhub511:7077
spark.eventLog.enabled true
spark.eventLog.dir hdfs://systemhub511:9000/directory
spark.yarn.historyServer.address=systemhub511:18080
spark.history.ui.port=18080
<!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true-->
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
[root@systemhub511 module]# scp -r spark/ root@systemhub611:/opt/module/
[root@systemhub511 module]# scp -r spark/ root@systemhub711:/opt/module/
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
./examples/jars/spark-examples_2.11-2.1.1.jar\
100

💥 1.3.4 Mesos Mode 💥

1.3.4.1 Mesos Mode 概述

💥 1.3.5 运行模式对比 💥

模式 集群数量 集群进程 所属者
Loacl Mode 1 Spark
Standalone Mode 3 Master & Worker Spark
Yarn Mode 1 Yarn & HDFS Hadoop

💥 1.3.6 WordCount 实例 💥

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.geekparkhub.core.spark</groupId>
<artifactId>spark_server</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>spark-common</module>
</modules>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
</dependencies>
</project>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spark_server</artifactId>
<groupId>com.geekparkhub.core.spark</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>spark-common</artifactId>
<build>
<finalName>WordCount</finalName>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
package com.geekparkhub.core.spark.application.wordcount
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* WordCountApplication
* <p>
*/
object WordCount {
def main(args: Array[String]): Unit = {
/**
* Create SparkConf
* 创建 SparkConf
*/
val sparkConf = new SparkConf().setMaster(args(0)).setAppName("WordCountApplication")
/**
* Create SparkContext
* 创建 SparkContext
*/
val sc = new SparkContext()
/**
* Read file
* 读取文件
*/
val line: RDD[String] = sc.textFile(args(1))
/**
* To flatten
* 压平
*/
val word: RDD[String] = line.flatMap(_.split(" "))
/**
* Word conversion dual group
* 单词转换二元组
*/
val wordAndOne: RDD[(String, Int)] = word.map((_, 1))
/**
* Count the total number of words
* 统计单词总数
*/
val wordCount: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)
/**
* Write out the file
* 写出文件
*/
wordCount.saveAsTextFile(args(2))
/**
* Close resource
* 关闭资源
*/
sc.stop()
}
}
[root@systemhub511 ~]# hadoop fs -mkdir -p /core_flow/spark/input/wordcount
hadoop fs -put /opt/module/spark/input/wordcount/wordcount_001.txt /core_flow/spark/input/wordcount
bin/spark-submit \
--class com.geekparkhub.core.spark.application.wordcount.WordCount \
--master yarn \
./lib_jar/WordCount.jar yarn \
/core_flow/spark/input/wordcount/wordcount_001.txt \
/core_flow/spark/output/wordcount
[root@systemhub511 spark]# hadoop fs -ls -R /core_flow/spark/output/wordcount/
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
-rw-r--r-- 3 root supergroup /core_flow/spark/output/wordcount/_SUCCESS
-rw-r--r-- 3 root supergroup /core_flow/spark/output/wordcount/part-00000
-rw-r--r-- 3 root supergroup /core_flow/spark/output/wordcount/part-00001
[root@systemhub511 spark]#
[root@systemhub511 spark]# hadoop fs -cat /core_flow/spark/output/wordcount/part-00000
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
(scala,1)
(hive,2)
(oozie,1)
(java,1)
[root@systemhub511 spark]#
[root@systemhub511 spark]# hadoop fs -cat /core_flow/spark/output/wordcount/part-00001
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
(spark,2)
(hadoop,3)
(flume,1)
(hbase,1)
[root@systemhub511 spark]#

🔥 1.3 Spark Core 🔥

1.3.1 RDD 概述

1.3.1.1 什么是RDD

RDD (Resilient Distributed Dataset)弹性分布式数据集是Spark中最基本数据抽象,代码中是一个抽象类,它代表一个弹性/不可变/可分区/里面的元素可并行计算的集合.

1.3.1.2 RDD 属性
* Internally, each RDD is characterized by five main properties:
*
* - 1. A list of partitions
* - 2. A function for computing each split
* - 3. A list of dependencies on other RDDs
* - 4. Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
* - 5. Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

1.一组分区(Partition),即数据集基本组成单位;
2.一个计算每个分区的函数;
3.RDD之间依赖关系;
4.一个Partitioner,即RDD分片函数;
5.一个列表,存储存取每个Partition的优先位置(preferred location)

1.3.1.3 RDD 特点

RDD表示只读分区数据集,对RDD进行改动,只能通过RDD转换操作,由一个RDD得到一个新的RDD,新的RDD包含了从其他RDD衍生所必需的信息,RDDs之间存在依赖,RDD执行是按照血缘关系延时计算,如果血缘关系较长,可以通过持久化RDD来切断血缘关系.

1.3.1.3.1 弹性
1.3.1.3.2 分区

RDD逻辑上是分区的,每个分区数据是抽象存在的,计算时会通过一个compute函数得到每个分区数据,如果RDD是通过已有文件系统构建,则compute函数是读取指定文件系统中数据,如果RDD是通过其他RDD转换而来,则compute函数是执行转换逻辑将其他RDD数据进行转换.

1.3.1.3.3 只读

RDD是只读的,要想改变RDD中数据,只能在现有RDD基础上创建新的RDD.

由一个RDD转换到另一个RDD,可以通过丰富的操作算子实现,不再像MapReduce那样只能写map和reduce.

RDD操作算子包括两类,一类是transformations,它是用来将RDD进行转化,构建RDD的血缘关系,另一类是actions,它是用来触发RDD计算得到RDD相关计算结果或者将RDD保存文件系统中.

1.3.1.3.4 依赖

enter image description here

如图所示,RDDs通过操作算子进行转换,转换得到新RDD包含了从其他RDDs衍生所必需的信息,RDDs之间维护着这种血缘关系,也称之为依赖.

依赖包括两种,一种是窄依赖,RDDs之间分区是一一对应,另一种是宽依赖,下游RDD的每个分区与上游RDD(也称之为父RDD)的每个分区都有关,是多对多关系.

1.3.1.3.5 缓存

enter image description here

如果在应用程序中多次使用同一个RDD时,可以将该RDD缓存起来,该RDD只有在第一次计算时会根据血缘关系得到分区数据,在后续其他地方用到该RDD时,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用.

如图所示,RDD-1经过一系列转换后得到RDD-n并保存到HDFS,RDD-1在这一过程中会有个中间结果,如果将其缓存到内存,那么在随后RDD-1转换到RDD-m这一过程中,就不会计算其之前的RDD-0.

1.3.1.3.6 CheckPoint

虽然RDD血缘关系天然地可以实现容错,当RDD某个分区数据失败或丢失,可以通过血缘关系重建,但是对于长时间迭代型应用来说随着迭代进行,RDDs之间血缘关系会越来越长,一旦在后续迭代过程中出错,则需要通过非常长的血缘关系去重建,势必影响性能.

为此,RDD支持checkpoint将数据保存到持久化存储中,这样就可以切断之前血缘关系,因为checkpoint后的RDD不需要知道它的父RDDs,它可以从checkpoint处拿到数据.

1.3.2 RDD 编程

1.3.2.1 编程模型

在Spark中,RDD被表示为对象,通过对象方法调用RDD进行转换,经过一系列的transformations定义RDD之后,就可以调用actions触发RDD计算,action可以是向应用程序返回结果(count,collect等),或者是向存储系统保存数据(saveAsTextFile等).
在Spark中,只有遇到action才会执行RDD计算(即延迟计算),这样在运行时可以通过管道方式传输多个转换.
使用Spark开发者需要编写一个Driver程序,它被提交到集群以调度运行Worker,Driver中定义了一个或多个RDD.并调用RDD上的action.Worker则执行RDD分区计算任务.

1.3.2.2 RDD 创建
1.3.2.1 集合创建RDD
scala> val rdd = sc.parallelize(Array(511,611,711))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd.collect
res0: Array[Int] = Array(511, 611, 711)
scala>
scala> val makerdd = sc.makeRDD(Array(511,611,711))
makerdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:24
scala> makerdd.collect
res1: Array[Int] = Array(511, 611, 711)
scala>
1.3.2.2 外部存储系统数据集创建RDD
scala> sc.textFile("/opt/module/spark/input/wordcount/wordcount_001.txt")
res2: org.apache.spark.rdd.RDD[String] = /opt/module/spark/input/wordcount/wordcount_001.txt MapPartitionsRDD[3] at textFile at <console>:25
scala>
1.3.2.3 从其他创建RDD
1.3.2.3 RDD 转换
1.3.2.3.1 Value 类型
1.3.2.3.1.1 map(func) Method
scala> val rdd = sc.parallelize(Array(511,611,711))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd.collect
res0: Array[Int] = Array(511, 611, 711)
scala>
scala> rdd.map((_,1)).collect
res4: Array[(Int, Int)] = Array((511,1), (611,1), (711,1))
scala>
scala> rdd.map((_*2)).collect
res5: Array[Int] = Array(1022, 1222, 1422)
scala>
1.3.2.3.1.2 mapPartitions(func) Method
scala> rdd.mapPartitions(_.map(_*2)).collect
res11: Array[Int] = Array(1022, 1222, 1422)
scala>
1.3.2.3.1.3 mapPartitionsWithIndex(func) Method
scala> rdd.mapPartitionsWithIndex((index,items)=>(items.map((index,_)))).collect
res13: Array[(Int, Int)] = Array((1,511), (2,611), (3,711))
scala>
1.3.2.3.1.4 flatMap(func) Method
scala> val text = sc.textFile("/core_flow/spark/input/wordcount/wordcount_001.txt")
text: org.apache.spark.rdd.RDD[String] = /core_flow/spark/input/wordcount/wordcount_001.txt MapPartitionsRDD[15] at textFile at <console>:24
scala> text.flatMap(_.split(" ")).collect
res16: Array[String] = Array(hadoop, spark, hive, hadoop, spark, hadoop, hbase, flume, hive, scala, java, oozie)
scala>
1.3.2.3.1.5 map()mapPartition()区别
1.3.2.3.1.6 glom Method
scala> rdd.glom.collect
res17: Array[Array[Int]] = Array(Array(), Array(511), Array(611), Array(711))
scala>
1.3.2.3.1.7 groupBy(func) Method
scala> rdd.groupBy(_ % 2).collect
res18: Array[(Int, Iterable[Int])] = Array((1,CompactBuffer(611, 711, 511)))
scala>
1.3.2.3.1.8 filter(func) Method
scala> rdd.filter(_%3==0).collect
res20: Array[Int] = Array(711)
scala>
1.3.2.3.1.9 sample(withReplacement,fraction,seed) Method
scala> val rdd = sc.parallelize(1 to 100)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at parallelize at <console>:24
scala> rdd.sample(false,0.1,3).collect
res22: Array[Int] = Array(1, 33, 37, 50, 59, 69, 75, 78, 85, 98)
scala>
1.3.2.3.1.10 distinct([numTasks])) Method
scala> rdd.distinct(4).collect
res23: Array[Int] = Array(84, 100, 96, 52, 56, 4, 76, 16, 28, 80, 48, 32, 36, 24, 64, 92, 40, 72, 8, 12, 20, 60, 44, 88, 68, 13, 41, 61, 81, 21, 77, 53, 97, 25, 29, 65, 73, 57, 93, 33, 37, 45, 1, 89, 17, 69, 9, 85, 49, 5, 34, 82, 66, 22, 54, 98, 46, 30, 14, 50, 62, 42, 74, 90, 6, 70, 18, 38, 86, 58, 78, 26, 94, 10, 2, 19, 39, 15, 47, 71, 55, 95, 79, 59, 11, 35, 27, 75, 51, 23, 63, 83, 67, 3, 7, 91, 31, 87, 43, 99)
scala>
1.3.2.3.1.11 coalesce(numPartitions) Method
scala> val rdd = sc.parallelize(1 to 16,4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at parallelize at <console>:24
scala> rdd.partitions.size
res24: Int = 4
scala> val coalesceRDD = rdd.coalesce(3)
coalesceRDD: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[28] at coalesce at <console>:26
scala> coalesceRDD.partitions.size
res25: Int = 3
scala>
1.3.2.3.1.12 repartition(numPartitions) Method
scala> val rdd = sc.parallelize(1 to 16,4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[29] at parallelize at <console>:24
scala> rdd.partitions.size
res26: Int = 4
scala> val rerdd = rdd.repartition(2)
rerdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[33] at repartition at <console>:26
scala> rerdd.partitions.size
res27: Int = 2
scala>
1.3.2.3.1.13 coalescerepartition区别

1.coalesce重新分区,可以选择是否进行shuffle过程,由参数shuffle: Boolean = false/true决定.

2.repartition实际上是调用coalesce,进行shuffle过程,源码演示:

def repartition(numpartitions: int)(implicit ord: ordering[t] = null): rdd[t] = withscope {
coalesce(numpartitions, shuffle = true)
}
1.3.2.3.1.14 sortBy(func,[ascending],[numTasks]) Method
scala> val rdd = sc.parallelize(List(2,1,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[34] at parallelize at <console>:24
scala> rdd.sortBy(x => x).collect()
res29: Array[Int] = Array(1, 2, 3, 4)
scala> rdd.sortBy(x => x%3).collect()
res30: Array[Int] = Array(3, 1, 4, 2)
scala> rdd.sortBy(x => x,false).collect()
res31: Array[Int] = Array(4, 3, 2, 1)
scala>
1.3.2.3.1.15 pipe(command,[envVars]) Method
[root@systemhub511 ~]# vim /opt/module/spark/input/pipe.sh
[root@systemhub511 ~]# chmod 777 /opt/module/spark/input/pipe.sh
#!/bin/
shecho"Start"
while read LINE;do
echo ">>>" ${LINE}
done
scala> rdd.pipe("/opt/module/spark/pipe.sh").collect
res18: Array[String] = Array(Start, >>>hi, >>>Hello, >>>how, >>>are, >>>you)
scala>
1.3.2.3.2 双Value类型交互
1.3.2.3.2.1 union(otherDataset) Method
scala> var rdd1 = sc.parallelize(1 to 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> var rdd2 = sc.parallelize(5 to 10)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24
scala> rdd1.union(rdd2).collect
res0: Array[Int] = Array(1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 10)
scala>
1.3.2.3.2.2 subtract(otherDataset) Method
scala> rdd1.subtract(rdd2).collect
res0: Array[Int] = Array(2, 4, 1, 3)
scala>
1.3.2.3.2.3 intersection(otherDataset) Method
scala> rdd1.intersection(rdd2).collect
res1: Array[Int] = Array(5)
scala>
1.3.2.3.2.4 cartesian(otherDataset) Method
scala> rdd1.cartesian(rdd2).collect
res2: Array[(Int, Int)] = Array((1,5), (1,6), (1,7), (2,5), (2,6), (2,7), (1,8), (1,9), (1,10), (2,8), (2,9), (2,10), (3,5), (3,6), (3,7), (4,5), (4,6), (4,7), (5,5), (5,6), (5,7), (3,8), (3,9), (3,10), (4,8), (4,9), (4,10), (5,8), (5,9), (5,10))
scala>
1.3.2.3.2.5 zip(otherDataset) Method
scala> rdd1.zip(rdd2).collect
res4: Array[(Int, Int)] = Array((1,6), (2,7), (3,8), (4,9), (5,10))
scala>
1.3.2.3.3 Key-Value 类型
1.3.2.3.3.1 partitionBy Method
scala> val rdd1 = sc.parallelize(Array((1,"A"),(2,"B"),(3,"C"),(4,"D")),4)
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd1.mapPartitionsWithIndex((i,t)=>t.map((i,_))).collect
res3: Array[(Int, (Int, String))] = Array((0,(1,A)), (1,(2,B)), (2,(3,C)), (3,(4,D)))
scala> rdd1.partitionBy(new org.apache.spark.HashPartitioner(2))
res5: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[3] at partitionBy at <console>:27
scala> res5.partitions.size
res6: Int = 2
scala>
1.3.2.3.3.2 reduceByKey(func,[numTasks]) Method
scala> val rdd = sc.parallelize(List(("female",1),("male",5),("female",5),("male",2)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[4] at parallelize at <console>:24
scala> rdd.reduceByKey((x,y)=>x+y).collect
res7: Array[(String, Int)] = Array((female,6), (male,7))
scala>
1.3.2.3.3.3 groupByKey Method
scala> rdd.groupByKey(2).collect
res8: Array[(String, Iterable[Int])] = Array((female,CompactBuffer(5, 1)), (male,CompactBuffer(5, 2)))
scala>
1.3.2.3.3.4 reduceByKeygroupByKey 区别

1.reduceByKey : 按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v]

2.groupByKey : 按照key进行分组,直接进行shuffle

3.开发指导 : reduceByKey比groupByKey,建议使用reduceByKey,但是需要注意是否会影响业务逻辑.

1.3.2.3.3.5 aggregateByKey Method

参数 : (zeroValue:U,[partitioner:Partitioner])(seqOp: (U, V) => U,combOp: (U, U) => U)

1.作用 : 在kv对的RDD中,按key将value进行分组合并,合并时将每个value和初始值作为seq函数参数进行计算,返回结果作为一个新的kv对,然后再将结果按照key进行合并,最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出.

2.参数描述 :
zeroValue : 给每一个分区中的每一个key一个初始值.
seqOp : 函数用于在每一个分区中用初始值逐步迭代value
combOp : 函数用于合并每个分区中的结果

enter image description here

scala> val rdd = sc.parallelize(List(("a",3),("a",2),("c",4),("b",3),("c",6),("c",8)),2)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at parallelize at <console>:24
scala> rdd.aggregateByKey(0)(math.max(_,_),_+_).collect
res9: Array[(String, Int)] = Array((b,3), (a,3), (c,12))
scala>
scala> rdd.aggregateByKey(0)(_+_,_+_).collect
res10: Array[(String, Int)] = Array((b,3), (a,5), (c,18))
scala> rdd.reduceByKey(_+_).collect
res11: Array[(String, Int)] = Array((b,3), (a,5), (c,18))
scala>
1.3.2.3.3.6 foldByKey Method
scala> rdd.foldByKey(0)(_+_).collect
res12: Array[(String, Int)] = Array((b,3), (a,5), (c,18))
scala>
1.3.2.3.3.7 combineByKey[C] Method

enter image description here

scala> rdd.combineByKey((_,1),(acc:(Int,Int),v)=>(acc._1+v,acc._2+1),(acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2)).collect
res15: Array[(String, (Int, Int))] = Array((b,(3,1)), (a,(5,2)), (c,(18,3)))
scala>
1.3.2.3.3.8 sortByKey([ascending],[numTasks]) Method
scala> rdd.sortByKey().collect
res17: Array[(String, Int)] = Array((a,3), (a,2), (b,3), (c,6), (c,8), (c,4))
scala> rdd.sortByKey(false).collect
res19: Array[(String, Int)] = Array((c,4), (c,6), (c,8), (b,3), (a,3), (a,2))
scala>
1.3.2.3.3.9 mapValues Method
scala> rdd.mapValues(_*2).collect
res20: Array[(String, Int)] = Array((a,6), (a,4), (c,8), (b,6), (c,12), (c,16))
scala>
1.3.2.3.3.10 join(otherDataset,[numTasks]) Method
scala> val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[20] at parallelize at <console>:24
scala> val rdd1 = sc.parallelize(Array((1,4),(2,5),(3,6)))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[21] at parallelize at <console>:24
scala> rdd.join(rdd1).collect
res21: Array[(Int, (String, Int))] = Array((1,(a,4)), (2,(b,5)), (3,(c,6)))
scala> rdd.leftOuterJoin(rdd1).collect
res22: Array[(Int, (String, Option[Int]))] = Array((1,(a,Some(4))), (2,(b,Some(5))), (3,(c,Some(6))))
scala> rdd.rightOuterJoin(rdd1).collect
res23: Array[(Int, (Option[String], Int))] = Array((1,(Some(a),4)), (2,(Some(b),5)), (3,(Some(c),6)))
scala>
1.3.2.3.3.11 cogroup(otherDataset,[numTasks]) Method
scala> rdd.cogroup(rdd1).collect
res24: Array[(Int, (Iterable[String], Iterable[Int]))] = Array((1,(CompactBuffer(a),CompactBuffer(4))), (2,(CompactBuffer(b),CompactBuffer(5))), (3,(CompactBuffer(c),CompactBuffer(6))))
scala>
1.3.2.4 Action
1.3.2.4.1 reduce(func) Method
scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[33] at parallelize at <console>:24
scala> rdd.reduce(_+_)
res25: Int = 55
scala>
1.3.2.4.2 collect() Method
scala> rdd.collect
res26: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala>
1.3.2.4.3 count() Method
scala> rdd.count
res27: Long = 10
scala>
1.3.2.4.4 first() Method
scala> rdd.first
res28: Int = 1
scala>
1.3.2.4.5 take(n) Method
scala> rdd.take(2)
res30: Array[Int] = Array(1, 2)
scala>
1.3.2.4.6 takeOrdered(n) Method
scala> rdd.takeOrdered(3)
res31: Array[Int] = Array(1, 2, 3)
scala>
1.3.2.4.7 aggregate Method

参数 : (zeroValue: U)(seqOp: (U, T) ⇒U, combOp: (U, U) ⇒U)

作用 : aggregate函数将每个分区里面的元素通过seqOp和初始值进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作,这个函数最终返回的类型不需要和RDD中元素类型一致.

scala> rdd.aggregate(0)(_+_,_+_)
res32: Int = 55
scala>
1.3.2.4.8 fold(num)(func) Method
scala> rdd.fold(0)(_+_)
res34: Int = 55
scala>
1.3.2.4.9 saveAsTextFile(path) Method
1.3.2.4.10 saveAsSequenceFile(path) Method
1.3.2.4.11 saveAsObjectFile(path) Method
1.3.2.4.12 countByKey() Method
scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[35] at parallelize at <console>:24
scala> rdd.countByKey
res35: scala.collection.Map[Int,Long] = Map(3 -> 2, 1 -> 3, 2 -> 1)
scala>
1.3.2.4.13 foreach(func) Method
scala> rdd.foreach(print)
1.3.2.5 RDD 函数传递

在实际开发中往往需要开发者定义一些对于RDD操作,那么此时需要主要的是,初始化工作是在Driver端进行,而实际运行程序是在Executor端进行,这就涉及到了跨进程通信,跨进程通信是需要序列化操作.

🔒 尚未解锁 正在探索中… 尽情期待 Blog更新! 🔒

1.3.2.5.1 传递方法
1.3.2.5.2 传递属性
1.3.2.6 RDD 依赖关系
1.3.2.6.1 Lineage
1.3.2.6.2 窄依赖
1.3.2.6.3 宽依赖
1.3.2.6.4 DAG
1.3.2.6.5 任务划分(重点)
1.3.2.6.7 RDD缓存
1.3.2.6.8 RDDCheckPoint

1.3.3 Key-Value RDD 数据分区

1.3.4 数据读取保存

1.3.5 RDD 编程进阶

🔥 1.4 Spark SQL 🔥

1.4.1 Spark SQL 概述

1.4.2 Spark SQL 查询

1.4.3 DataFrame

1.4.4 DataSet

1.4.5 聚合函数

1.4.6 Spark SQL 数据源

1.4.7 OLAP Server

1.4.8 Spark SQL 实例

🔥 1.5 Spark Streaming 🔥

1.5.1 Spark Streaming 概述

1.5.2 Spark Streaming Program

1.5.3 DataStream 概述

1.5.4 DataStream 输入

1.5.5 DataStream 转换

1.5.6 DataStream 输出

1.5.7 7*24hour运行

1.5.8 Spark Streaming 实例

🔥 2. Spark 高阶 🔥

2.1 内核机制

2.1 性能调优

3. 修仙之道 技术架构迭代 登峰造极之势

Alt text


💡如何对该开源文档进行贡献💡

  1. Blog内容大多是手敲,所以难免会有笔误,你可以帮我找错别字。

  2. 很多知识点我可能没有涉及到,所以你可以对其他知识点进行补充。

  3. 现有的知识点难免存在不完善或者错误,所以你可以对已有知识点的修改/补充。

  4. 💡欢迎贡献各领域开源野生Blog&笔记&文章&片段&分享&创想&OpenSource Project&Code&Code Review

  5. 🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈 issues: geekparkhub.github.io/issues 🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈

希望每一篇文章都能够对读者们提供帮助与提升,这乃是每一位笔者的初衷


💌感谢您的阅读 欢迎您的留言与建议💌

捐助 项目的发展离不开你的支持,请开发者喝杯☕Coffee☕吧!

enter image description here

致谢

捐助时请备注 UserName

ID UserName Donation Money Consume
1 Object WeChatPay 5RMB 一杯可乐
2 泰迪熊看月亮 AliPay 20RMB 一杯咖啡
3 修仙道长 WeChatPay 10RMB 两杯可乐

License 开源协议

Apache License Version 2.0